Source code for hysop.symbolic.field

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from abc import abstractmethod
import sympy as sm

from hysop.constants import BoundaryCondition

from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.sympy_utils import get_derivative_variables
from hysop.tools.numerics import find_common_dtype

from hysop.fields.continuous_field import Field, TensorField
from hysop.fields.discrete_field import DiscreteField, DiscreteTensorField
from hysop.symbolic import Symbol
from hysop.symbolic.base import TensorBase, SymbolicTensor
from hysop.symbolic.func import (
    UndefinedFunction,
    AppliedSymbolicFunction,
    FunctionBase,
    SymbolicFunctionTensor,
)
from hysop.domain.domain import Domain


[docs] class FieldExpressionI: def __new__(cls, *args, **kwds): return super().__new__(cls, *args, **kwds) def __init__(self, *args, **kwds): super().__init__(**kwds)
[docs] @abstractmethod def lboundaries(self): pass
[docs] @abstractmethod def rboundaries(self): pass
[docs] @abstractmethod def domain(self): pass
[docs] @abstractmethod def dtype(self): pass
@property def boundaries(self): return (self.lboundaries, self.rboundaries)
[docs] def format_boundaries(self): from hysop.constants import format_boundaries as fb return fb(*self.boundaries)
[docs] class FieldExpression(FieldExpressionI): def __new__(cls, *args, **kwds): domain = kwds.pop("domain", None) dtype = kwds.pop("dtype", None) lboundaries = kwds.pop("lboundaries", None) rboundaries = kwds.pop("rboundaries", None) obj = super().__new__(cls, *args, **kwds) obj._domain = domain obj._dtype = dtype obj._lboundaries = lboundaries obj._rboundaries = rboundaries return obj def __init__(self, *args, **kwds): super().__init__(*args, **kwds) @property def lboundaries(self): assert self._lboundaries is not None return self._lboundaries @lboundaries.setter def lboundaries(self, lb): check_instance( lb, npw.ndarray, values=BoundaryCondition, size=self.domain.dim, ndim=1 ) self._lboundaries = lb @property def rboundaries(self): assert self._rboundaries is not None return self._rboundaries @rboundaries.setter def rboundaries(self, rb): check_instance( rb, npw.ndarray, values=BoundaryCondition, size=self.domain.dim, ndim=1 ) self._rboundaries = rb @property def domain(self): assert self._domain is not None return self._domain @domain.setter def domain(self, dom): assert self._domain is None check_instance(dom, Domain) self._domain = dom @property def dtype(self): assert self._dtype is not None return self._dtype @dtype.setter def dtype(self, dt): assert self._dtype is None check_instance(dt, npw.dtype) self._dtype = dt
[docs] class DerivativeFieldExpr(FieldExpression, sm.Derivative): pass
[docs] class FieldExpressionBuilder:
[docs] class BoundaryIncompatibilityError(ValueError): pass
[docs] class InvalidExpression(ValueError): pass
[docs] @classmethod def is_field_expr(cls, expr): return isinstance(expr, FieldExpressionI)
[docs] @classmethod def update_boundaries(cls, boundary, order): from hysop.constants import BoundaryCondition if (order % 2) == 0: return boundary elif boundary is BoundaryCondition.PERIODIC: return BoundaryCondition.PERIODIC elif boundary is BoundaryCondition.HOMOGENEOUS_DIRICHLET: return BoundaryCondition.HOMOGENEOUS_NEUMANN elif boundary is BoundaryCondition.HOMOGENEOUS_NEUMANN: return BoundaryCondition.HOMOGENEOUS_DIRICHLET else: msg = "FATAL ERROR: Unknown boundary condition {}." msg = msg.format(bd) raise NotImplementedError(msg)
[docs] @classmethod def to_field_expression(cls, expr, space_symbols, strict=True): check_instance(expr, sm.Expr) def _to_field_expression_impl(expr): if cls.is_field_expr(expr): return expr elif isinstance(expr, sm.Derivative): e = _to_field_expression_impl(expr.args[0]) if cls.is_field_expr(e): dtype, domain = e.dtype, e.domain lb, rb = ( e.lboundaries.copy(), e.rboundaries.copy(), ) assert len(space_symbols) == lb.size == rb.size for xi in get_derivative_variables(expr): assert xi in space_symbols, xi i = space_symbols[::-1].index(xi) lb[i] = cls.update_boundaries(lb[i], +1) rb[i] = cls.update_boundaries(rb[i], +1) expr = DerivativeFieldExpr(e, *expr.args[1:]) expr.domain = domain expr.dtype = dtype expr.lboundaries = lb expr.rboundaries = rb return expr else: return expr else: func = expr.func args = tuple(_to_field_expression_impl(a) for a in expr.args) field_expression_args = tuple( filter(lambda x: cls.is_field_expr(x), args) ) if field_expression_args: try: return cls.make_expr(func, *args) except cls.BoundaryIncompatibilityError: msg = f"\nError during the handling of expression {expr}." msg += "\nSome boundaries were not compatible:" msg += "\n *" + "\n *".join( f"{a}: {a.format_boundaries()}" for a in field_expression_args ) raise cls.BoundaryIncompatibilityError(msg) else: return expr fexpr = _to_field_expression_impl(expr) if strict and (not cls.is_field_expr(fexpr)): msg = f"\nError during the handling of expression {expr}." msg += "\nCould not determine boundaries because no FieldExpression " msg += "was present in expression." raise cls.InvalidExpression(msg) return fexpr
[docs] @classmethod def make_expr(cls, func, *args): check_instance(func, type) field_expression_args = tuple(filter(lambda x: cls.is_field_expr(x), args)) if not field_expression_args: msg = "No FieldExpression arguments present in args." raise ValueError(msg) if not cls.check_boundary_compatibility(*field_expression_args): raise cls.BoundaryIncompatibilityError fea0 = field_expression_args[0] new_func = type(func.__name__ + "FieldExpr", (FieldExpression, func), {}) new_expr = new_func(*args) new_expr.dtype = npw.dtype( find_common_dtype(*tuple(a.dtype for a in field_expression_args)) ) new_expr.domain = fea0.domain new_expr.lboundaries = fea0.lboundaries.copy() new_expr.rboundaries = fea0.rboundaries.copy() return new_expr
[docs] @classmethod def check_boundary_compatibility(cls, arg0, *args): check_instance(args, tuple, values=FieldExpressionI) domain, lb, rb = arg0.domain, arg0.lboundaries, arg0.rboundaries if args: match = all((domain == a.domain) for a in args) match &= all(all(lb == a.lboundaries) for a in args) match &= all(all(rb == a.rboundaries) for a in args) return match else: return True
[docs] class FieldBase(FunctionBase): def _sympy_(self): """for sympify""" return self def __new__(cls, field, idx=None, **kwds): assert "name" not in kwds assert "pretty_name" not in kwds assert "latex_name" not in kwds assert "var_name" not in kwds check_instance(field, (Field, DiscreteField)) assert (field.nb_components == 1) or (idx is not None), ( field.nb_components, idx, ) index = first_not_None(idx, [0])[0] name = field.name pretty_name = field.pretty_name var_name = field.var_name latex_name = field.latex_name assert 0 <= index < field.nb_components, index try: obj = super().__new__( cls, name=name, pretty_name=pretty_name, var_name=var_name, latex_name=latex_name, **kwds, ) except TypeError: obj = super().__new__(cls, name=name, **kwds) obj._field = field obj._index = index return obj def __init__(self, field, idx=None, **kwds): try: super().__init__( name=None, pretty_name=None, var_name=None, latex_name=None, **kwds ) except TypeError: super().__init__(name=None, **kwds) def _hashable_content(self): """See sympy.core.basic.Basic._hashable_content()""" hc = super()._hashable_content() hc += ( self._field, self._index, ) return hc @property def field(self): """Get associated field.""" return self._field @property def index(self): """Get component index of the target field.""" return self._index @property def indexed_field(self): """Get a unique identifier for an indexed field component.""" return (self._field, self._index) def __getitem__(self, key): assert key == 0 return self
[docs] class SymbolicDiscreteField(FieldBase, Symbol): """ Symbolic discrete field symbol. """ def __new__(cls, field, name=None, fn=None, **kwds): check_instance(field, DiscreteField) return super().__new__(cls, field=field, fn=fn, **kwds) def __init__(self, field, name=None, fn=None, **kwds): super().__init__(field=field, fn=fn, **kwds)
[docs] @classmethod def from_field(cls, field): if field.nb_components == 1: return cls(field=field) else: return SymbolicDiscreteFieldTensor(field=field)
[docs] class SymbolicField(FieldBase, UndefinedFunction): """ Symbolic unapplied scalar field as an undefined function of some frame coordinates and time. This is a metaclass. """ def __new__(cls, field, fn=None, bases=None, **kwds): bases = first_not_None(bases, (AppliedSymbolicField,)) check_instance(field, Field) return super().__new__(cls, bases=bases, field=field, fn=fn, **kwds) def __init__(self, field, fn=None, bases=None, **kwds): super().__init__(bases=bases, field=field, fn=fn, **kwds)
[docs] def __hash__(self): "Fix sympy v1.2 hashes" h = super().__hash__() for hc in (self.field, self.index): h ^= hash(h) return h
[docs] def __eq__(self, other): "Fix sympy v1.2 eq" eq = super().__eq__(other) if eq is not True: return eq for lhc, rhc in zip((self.field, self.index), (other.field, other.index)): eq &= lhc == rhc return eq
[docs] def __ne__(self, other): "Fix sympy v1.2 neq" return not (self == other)
[docs] class AppliedSymbolicField(FieldExpressionI, AppliedSymbolicFunction): """Applied scalar fields, hold a reference to a continuous field.""" def __new__(cls, *args, **kwds): args = args if args else cls.field.domain.frame.vars return super().__new__(cls, *args, **kwds) def __init__(self, *args, **kwds): super().__init__(*args, **kwds) def _sympy_(self): """for sympify""" return self def _hashable_content(self): """See sympy.core.basic.Basic._hashable_content()""" hc = super()._hashable_content() hc += ( self.field, self.index, ) return hc @property def field(self): return type(self).field @property def index(self): """Get component index of the target field.""" return type(self).index @property def indexed_field(self): """Get a unique identifier for an indexed field component.""" return (self.field, self.index) @property def lboundaries(self): return self.field.lboundaries @property def rboundaries(self): return self.field.rboundaries @property def domain(self): return self.field.domain @property def dtype(self): return self.field.dtype
[docs] class SymbolicFieldTensor(SymbolicFunctionTensor): """Symbolic tensor symbol.""" def __new__(cls, field, **kwds): check_instance(field, TensorField) shape = field.shape init = npw.empty(shape=shape, dtype=object) for idx, field in field.nd_iter(): init[idx] = field.symbol return super().__new__(cls, shape=shape, init=init) def __init__(self, field, **kwds): super().__init__(shape=None, init=None)
[docs] class SymbolicDiscreteFieldTensor(TensorBase): """Symbolic tensor symbol.""" def __new__(cls, dfield, name=None, **kwds): from hysop.fields.discrete_field import DiscreteTensorField check_instance(dfield, DiscreteTensorField) shape = dfield.shape init = npw.empty(shape=shape, dtype=object) for idx, df in dfield.nd_iter(): init[idx] = df.symbol return super().__new__(cls, shape=shape, init=init, **kwds) def __init__(self, dfield, name=None, **kwds): super().__init__(shape=None, init=None, **kwds)
[docs] def diff(F, *symbols, **assumptions): is_tensor = isinstance(F, npw.ndarray) if is_tensor: return F.view(TensorBase).diff(*symbols, **assumptions) else: return sm.diff(F, *symbols, **assumptions)
[docs] def grad(F, frame, axis=-1): is_tensor = isinstance(F, npw.ndarray) if is_tensor: shape = F.shape ndim = max(F.ndim, 1) axis = (axis + ndim) % ndim new_shape = shape[: axis + 1] + (frame.dim,) + shape[axis + 1 :] else: assert isinstance(F, sm.Expr) assert (axis == -1) or (axis == 0) shape = (1,) new_shape = (frame.dim,) gradF = npw.ndarray(shape=new_shape, dtype=object) for idx in npw.ndindex(*shape): for i, xp in enumerate(frame.coords): if is_tensor: new_idx = idx[: axis + 1] + (i,) + idx[axis + 1 :] gradF[new_idx] = diff(F[idx], xp) else: gradF[i] = diff(F, xp) return gradF.view(TensorBase)
[docs] def div(F, frame, axis=-1): if isinstance(F, npw.ndarray): assert F.shape[axis] == frame.dim shape = F.shape ndim = F.ndim axis = (axis + ndim) % ndim divF = npw.empty_like(F) for idx in npw.ndindex(*shape): divF[idx] = diff(F[idx], frame.coords[idx[axis]]) divF = divF.sum(axis=axis) try: if divF.size == 1: return divF.item() else: return divF.view(TensorBase) except AttributeError: return divF else: assert frame.dim == 1 return F.diff(frame.coords[0])
[docs] def rot(F, frame): F = npw.atleast_1d(F) assert F.ndim == 1, F.ndim assert frame.dim in (2, 3) X = frame.coords if frame.dim == 2: if F.size == 1: rotF = npw.asarray( [ +diff(F[0], X[1]), -diff(F[0], X[0]), ] ) return rotF.view(TensorBase) elif F.size == 2: return diff(F[1], X[0]) - diff(F[0], X[1]) else: raise ValueError(F.size) elif frame.dim == 3: if F.size == 3: rotF = npw.empty_like(F) rotF[0] = diff(F[2], X[1]) - diff(F[1], X[2]) rotF[1] = diff(F[0], X[2]) - diff(F[2], X[0]) rotF[2] = diff(F[1], X[0]) - diff(F[0], X[1]) return rotF.view(TensorBase) else: raise ValueError(F.size) else: raise ValueError(frame.dim)
[docs] def curl(*args, **kwds): return rot(*args, **kwds)
[docs] def laplacian(F, frame): return div(grad(F, frame), frame)
[docs] def convective_derivative(U, F, frame): return diff(F, frame.time) + grad(F, frame).dot(U)
if __name__ == "__main__": from hysop import Field, Box from hysop.tools.contexts import printoptions from hysop.tools.sympy_utils import enable_pretty_printing enable_pretty_printing() dim = 3 box = Box(length=(1,) * dim) frame = box.frame S0 = Field(domain=box, name="S0", nb_components=1, dtype=npw.float32) S1 = Field(domain=box, name="S1", nb_components=1, dtype=npw.float64) U = Field(domain=box, name="U", is_vector=True, dtype=npw.uint8) V = Field(domain=box, name="V", nb_components=2, dtype=npw.int32) s0 = S0.s s1 = S1.s u = U.s v = V.s assert s0.field is S0 assert s1.field is S1 assert u[0].field is U assert v[-1].field is V with printoptions(linewidth=1000): print(s0, s0.index, s0.field.short_description()) print() print(s1, s1.index, s1.field.short_description()) print() print(u[0], u[0].index, u[0].field.short_description()) print(u[1], u[1].index, u[1].field.short_description()) print(u[2], u[2].index, u[2].field.short_description()) print() print(v) print(v[0], v[0].index, v[0].field.short_description()) print(v[1], v[1].index, v[1].field.short_description()) print() print(s0(*frame.vars)) print(s1(*frame.vars)) print(u(*frame.vars)) print(v(*frame.vars)) print() print(grad(s0(*frame.vars), frame)) print() print(grad(s0(frame.coords[0]), frame)) print() print(grad(s1(frame.coords[-1]), frame)) print() print(grad(v(*frame.vars), frame)) print() print(grad(u(*frame.vars), frame)) print() print(grad(u(frame.coords[1]), frame)) print() print(div(u(*frame.vars), frame)) print() print(div(grad(s0(*frame.vars), frame), frame)) print() print(div(grad(v(*frame.vars), frame), frame)) print() print(laplacian(s0(*frame.vars), frame)) print() print(rot(u(*frame.vars), frame)) print() print(convective_derivative(u(*frame.vars), u(*frame.vars), frame)) print() print(npw.eye(8, dtype=npw.uint8).view(TensorBase).latex()) print() print(convective_derivative(u(*frame.vars), u(*frame.vars), frame).latex()) print() D = SymbolicTensor("D", shape=(3, 3)) print(D) print(div(D.dot(grad(s0(*frame.vars), frame)), frame)) print() print(div(D.dot(grad(u(*frame.vars), frame)), frame)) print() print(D.dot(grad(s0(*frame.vars), frame))) print() print(grad(div(u(*frame.vars), frame), frame) - div(grad(u(), frame), frame))